package cn.youfule.mecs.netty;

import io.netty.channel.Channel;
import io.netty.channel.ChannelId;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;
import lombok.extern.slf4j.Slf4j;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.*;

/**
 * @author fortuneju
 */
@Slf4j
public class ChannelSupervise {
    private static ChannelGroup GLOBAL_GROUP = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
    private static ConcurrentMap<String, ChannelId> CHANNEL_MAP = new ConcurrentHashMap();

    public  static void addChannel(String key,Channel channel){
        GLOBAL_GROUP.add(channel);
        // CHANNEL_MAP.put(channel.id().asShortText(),channel.id());
        CHANNEL_MAP.put(key, channel.id());
    }
    public static void removeChannel(String key){
        GLOBAL_GROUP.remove(findChannel(key));
        CHANNEL_MAP.remove(key);
    }
    public static  Channel findChannel(String key){
        ChannelId id = CHANNEL_MAP.get(key);
        if(id == null){
            return null;
        }
        return GLOBAL_GROUP.find(id);
    }
    public static void send2All(Object msg){
        GLOBAL_GROUP.writeAndFlush(msg);
    }
    public static MecsProtocol send2Single(String key, MecsProtocol prot){
        Channel channel = findChannel(key);
        if(channel == null){
            log.error("设备不在线："+key);
            return null;
        }
        String receiveKey = receiveKey(prot);
        //初始化队列
        initReceiveMsg(receiveKey);
        //发送数据
        channel.writeAndFlush(prot);
        //等待返回结果
        MecsProtocol inbound = waitReceiveMsg(receiveKey);
        return inbound;
    }
    public static List<MecsProtocol> send2Multiple(Map<String,MecsProtocol> map){
        List<String> receiveKeyList = new ArrayList<>();
        for (Map.Entry<String, MecsProtocol> entry:map.entrySet()) {
            Channel channel = findChannel(entry.getKey());
            if(channel == null){
                log.error("设备不在线："+entry.getKey());
                continue;
            }
            String receiveKey = receiveKey(entry.getValue());
            //初始化队列
            initReceiveMsg(receiveKey);
            //发送数据
            channel.writeAndFlush(entry.getValue());
            receiveKeyList.add(receiveKey);
        }
        List<MecsProtocol> result = new ArrayList<>();
        for (String key:receiveKeyList){
            result.add(waitReceiveMsg(key));
        }
        return result;
    }

    /**
     * 响应消息缓存
     * TODO 需要替换为可以设置过期时间的缓存工具
     */
    private static ConcurrentMap<String, BlockingQueue<MecsProtocol>> responseMsgCache =  new ConcurrentHashMap<>();
    /*CacheBuilder.newBuilder()
            .maximumSize(50000)
            .expireAfterWrite(1000, TimeUnit.SECONDS)
            .build();*/

    /**
     * 相应消息缓存的key
     * @param prot
     * @return
     */
    public static String receiveKey(MecsProtocol prot){
        return new StringBuilder()
                .append(prot.getId())
                .append(prot.getFactory())
                .append(prot.getType())
                .append(prot.getFunctionCode())
                .toString();
    }
    /**
     * 初始化响应消息的队列
     * @param key 消息唯一标识
     */
    public static void initReceiveMsg(String key) {
        responseMsgCache.put(key,new LinkedBlockingQueue<MecsProtocol>(1));
    }

    /**
     * 设置响应消息
     * @param msg
     */
    public static void setReceiveMsg(MecsProtocol msg){
        //消息唯一标识
        String key = receiveKey(msg);
        if(responseMsgCache.get(key) != null){
            responseMsgCache.get(key).add(msg);
        }
    }

    /**
     * 等待响应消息
     * @param key 消息唯一标识
     * @return ReceiveDdcMsgVo
     */
    public static MecsProtocol waitReceiveMsg(String key) {
        try {
            //设置超时时间
            MecsProtocol vo = Objects.requireNonNull(responseMsgCache.get(key))
                    .poll(10000, TimeUnit.MILLISECONDS);

            //删除key
            responseMsgCache.remove(key);
            return vo;
        } catch (Exception e) {
            log.error("获取数据异常,sn={},msg=null",key);
        }
        return null;
    }

}
